/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.cloud;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactory;
import org.apache.solr.util.SecurityJson;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Test of {@link DocExpirationUpdateProcessorFactory} in a cloud setup */
@LuceneTestCase.Nightly // Has to do some sleeping to wait for a future expiration
public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {

  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  private String COLLECTION = null;
  private String USER = null;
  private String PASS = null;

  @After
  public void cleanup() throws Exception {
    shutdownCluster();
    COLLECTION = null;
    USER = null;
    PASS = null;
  }

  /** Modifies the request to inlcude authentication params if needed, returns the request */
  private <T extends SolrRequest<?>> T setAuthIfNeeded(T req) {
    if (null != USER) {
      assertNotNull(PASS);
      req.setBasicAuthCredentials(USER, PASS);
    }
    return req;
  }

  public void setupCluster(boolean security) throws Exception {
    // we want at most one core per node to force lots of network traffic to try and tickle
    // distributed bugs
    final MiniSolrCloudCluster.Builder b =
        configureCluster(4)
            .addConfig(
                "conf", TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf"));

    COLLECTION = "expiring";
    if (security) {
      USER = SecurityJson.USER;
      PASS = SecurityJson.PASS;
      COLLECTION += "_secure";

      b.withSecurityJson(SecurityJson.SIMPLE);
    }
    b.configure();

    setAuthIfNeeded(CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2))
        .process(cluster.getSolrClient());

    waitForState(
        "Waiting for collection creation",
        COLLECTION,
        (n, c) -> SolrCloudTestCase.replicasForCollectionAreFullyActive(n, c, 2, 2));
  }

  @Test
  public void testNoAuth() throws Exception {
    setupCluster(false);
    runTest();
  }

  @Test
  public void testBasicAuth() throws Exception {
    setupCluster(true);

    // check that our cluster really does require authentication
    assertEquals(
        "check of unauthenticated request",
        401,
        expectThrows(
                SolrException.class,
                () -> {
                  final long ignored =
                      cluster
                          .getSolrClient()
                          .query(
                              COLLECTION,
                              params(
                                  "q", "*:*",
                                  "rows", "0",
                                  "_trace", "no_auth_check"))
                          .getResults()
                          .getNumFound();
                })
            .code());

    runTest();
  }

  private void runTest() throws Exception {
    final int totalNumDocs = atLeast(50);

    // Add a bunch of docs; some with extremely short expiration, some with no expiration
    // these should be randomly distributed to each shard
    long numDocsThatNeverExpire = 0;
    {
      final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
      for (int i = 1; i <= totalNumDocs; i++) {
        final SolrInputDocument doc = sdoc("id", i);

        if (random().nextBoolean()) {
          doc.addField("should_expire_s", "yup");
          doc.addField("tTl_s", "+1SECONDS");
        } else {
          numDocsThatNeverExpire++;
        }

        req.add(doc);
      }
      req.commit(cluster.getSolrClient(), COLLECTION);
    }

    // NOTE: don't assume we can find exactly totalNumDocs right now, some may have already been
    // deleted...

    // it should not take long for us to get to the point where all 'should_expire_s:yup' docs are
    // gone
    waitForNoResults(
        30, params("q", "should_expire_s:yup", "rows", "0", "_trace", "init_batch_check"));

    {
      // ...*NOW* we can assert that exactly numDocsThatNeverExpire should exist...
      final QueryRequest req =
          setAuthIfNeeded(
              new QueryRequest(
                  params(
                      "q", "*:*",
                      "rows", "0",
                      "_trace", "count_non_expire_docs")));

      // NOTE: it's possible that replicas could be out of sync but this query may get lucky and
      // only hit leaders.  we'll compare the counts of every replica in every shard later on...
      assertEquals(
          numDocsThatNeverExpire,
          req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound());
    }

    // now that we've confirmed the basics work, let's check some fine grain stuff...

    // first off, check that this special docId doesn't somehow already exist
    waitForNoResults(0, params("q", "id:special99", "rows", "0", "_trace", "sanity_check99"));

    {
      // force a hard commit on all shards (the prior auto-expire would have only done a soft
      // commit) so we can ensure our indexVersion won't change unnecessarily on the un-affected
      // shard when we add & (hard) commit our special doc...
      final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
      req.commit(cluster.getSolrClient(), COLLECTION);
    }

    // record important data for each replica core, so we can check later that it only changes for
    // the replicas of a single shard after we add/expire a single special doc
    log.info("Fetching ReplicaData BEFORE special doc addition/expiration");
    final Map<String, ReplicaData> initReplicaData = getTestDataForAllReplicas();
    assertTrue("WTF? no replica data?", 0 < initReplicaData.size());

    // add & hard commit a special doc with a short TTL
    setAuthIfNeeded(new UpdateRequest())
        .add(sdoc("id", "special99", "should_expire_s", "yup", "tTl_s", "+30SECONDS"))
        .commit(cluster.getSolrClient(), COLLECTION);

    // wait for our special docId to be deleted
    waitForNoResults(
        180, params("q", "id:special99", "rows", "0", "_trace", "did_special_doc_expire_yet"));

    // now check all the replicas to verify a few things:
    // - only the replicas of one shard changed -- no unnecessary churn on other shards
    // - every replica of each single shard should have the same number of docs
    // - the total number of docs should match numDocsThatNeverExpire
    log.info("Fetching ReplicaData AFTER special doc addition/expiration");
    final Map<String, ReplicaData> finalReplicaData = getTestDataForAllReplicas();
    assertEquals("WTF? not same num replicas?", initReplicaData.size(), finalReplicaData.size());

    final Set<String> coresThatChange = new HashSet<>();
    final Set<String> shardsThatChange = new HashSet<>();

    int coresCompared = 0;
    long totalDocsOnAllShards = 0;
    final DocCollection collectionState =
        cluster.getSolrClient().getClusterState().getCollection(COLLECTION);
    for (Slice shard : collectionState) {
      boolean firstReplica = true;
      for (Replica replica : shard) {
        coresCompared++;
        assertEquals(shard.getName(), replica.getShard()); // sanity check
        final String core = replica.getCoreName();
        final ReplicaData initData = initReplicaData.get(core);
        final ReplicaData finalData = finalReplicaData.get(core);
        assertNotNull(shard.getName() + ": no init data for core: " + core, initData);
        assertNotNull(shard.getName() + ": no final data for core: " + core, finalData);

        if (!initData.equals(finalData)) {
          log.error("ReplicaData changed: {} != {}", initData, finalData);
          coresThatChange.add(core + "(" + shard.getName() + ")");
          shardsThatChange.add(shard.getName());
        }

        if (firstReplica) {
          totalDocsOnAllShards += finalData.numDocs;
          firstReplica = false;
        }
      }
    }

    assertEquals(
        "Exactly one shard should have changed, instead: "
            + shardsThatChange
            + " cores=("
            + coresThatChange
            + ")",
        1,
        shardsThatChange.size());
    assertEquals("somehow we missed some cores?", initReplicaData.size(), coresCompared);

    assertEquals(
        "Final tally has incorrect numDocsThatNeverExpire",
        numDocsThatNeverExpire,
        totalDocsOnAllShards);

    // TODO: above logic verifies that deleteByQuery happens on all nodes, and ...
    // doesn't affect searcher re-open on shards w/o expired docs ... can we also verify
    // that *only* one node is sending the deletes ?
    // (ie: no flood of redundant deletes?)

  }

  /**
   * returns a map whose key is the coreNodeName and whose value is data about that core needed for
   * the test
   */
  private Map<String, ReplicaData> getTestDataForAllReplicas()
      throws IOException, SolrServerException {
    Map<String, ReplicaData> results = new HashMap<>();

    DocCollection collectionState =
        cluster.getSolrClient().getClusterState().getCollection(COLLECTION);

    for (Replica replica : collectionState.getReplicas()) {

      String coreName = replica.getCoreName();
      try (SolrClient client = getHttpSolrClient(replica)) {

        ModifiableSolrParams params = new ModifiableSolrParams();
        params.set("command", "indexversion");
        params.set("_trace", "getIndexVersion");
        params.set("qt", ReplicationHandler.PATH);
        QueryRequest req = setAuthIfNeeded(new QueryRequest(params));

        NamedList<Object> res = client.request(req);
        assertNotNull("null response from server: " + coreName, res);

        Object version = res.get("indexversion");
        assertNotNull("null version from server: " + coreName, version);
        assertTrue("version isn't a long: " + coreName, version instanceof Long);

        long numDocs =
            setAuthIfNeeded(
                    new QueryRequest(
                        params(
                            "q", "*:*",
                            "distrib", "false",
                            "rows", "0",
                            "_trace", "counting_docs")))
                .process(client)
                .getResults()
                .getNumFound();

        final ReplicaData data =
            new ReplicaData(replica.getShard(), coreName, (Long) version, numDocs);
        log.info("{}", data);
        results.put(coreName, data);
      }
    }

    return results;
  }

  /**
   * Executes a query over and over against the cloudClient every 5 seconds until the numFound is 0
   * or the maxTimeLimitSeconds is exceeded. Query is guaranteed to be executed at least once.
   */
  private void waitForNoResults(int maxTimeLimitSeconds, SolrParams params)
      throws SolrServerException, InterruptedException, IOException {

    final QueryRequest req = setAuthIfNeeded(new QueryRequest(params));
    final TimeOut timeout =
        new TimeOut(maxTimeLimitSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);

    long numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
    while (0L < numFound && !timeout.hasTimedOut()) {
      Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS))));

      numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
    }

    assertEquals("Give up waiting for no results: " + params, 0L, numFound);
  }

  private static class ReplicaData {
    public final String shardName;
    public final String coreName;
    public final long indexVersion;
    public final long numDocs;

    public ReplicaData(
        final String shardName,
        final String coreName,
        final long indexVersion,
        final long numDocs) {
      assertNotNull(shardName);
      assertNotNull(coreName);

      this.shardName = shardName;
      this.coreName = coreName;
      this.indexVersion = indexVersion;
      this.numDocs = numDocs;
    }

    @Override
    public String toString() {
      return "ReplicaData(shard="
          + shardName
          + ",core="
          + coreName
          + ",indexVer="
          + indexVersion
          + ",numDocs="
          + numDocs
          + ")";
    }

    @Override
    public boolean equals(Object other) {
      if (other instanceof ReplicaData that) {
        return this.shardName.equals(that.shardName)
            && this.coreName.equals(that.coreName)
            && (this.indexVersion == that.indexVersion)
            && (this.numDocs == that.numDocs);
      }
      return false;
    }

    @Override
    public int hashCode() {
      return Objects.hash(this.shardName, this.coreName, this.indexVersion, this.numDocs);
    }
  }
}
